Apache Pulsar 正式引入 Cloud Storage Sink 连接器:实现 Apache Pulsar 数据上云
越来越多的企业选择将数据存储到云平台中。对于大部分软件体系结构而言,“数据上云”至关重要。将数据迁移上云,有助于降低企业采购软硬件的成本,减少监控、管理工作,提供较大存储容量。而且,云存储支持数据备份,保护数据免受勒索软件的侵害。
许多 Pulsar 用户选择将数据存储在各种云平台中,例如 Amazon Simple Storage Service(Amazon S3)或 Google Cloud Storage(Google GCS)等。如果没有统一的应用程序将主题级别的数据迁移到云存储,Pulsar 用户必须自己编写解决方案。这是一项繁琐的任务。
今天,我们很高兴地宣布 Apache Pulsar 引入 Cloud Storage Sink 连接器(以下简称为 Cloud Storage 连接器)。Cloud Storage 连接器采用简单、可靠的方式,帮助用户将数据从 Apache Pulsar 迁移到云存储的对象中。
什么是 Cloud Storage 连接器
Cloud Storage 连接器定期轮询 Pulsar 数据,然后将其以 Avro、JSON 或 Parquet 格式存储到云存储的对象(AWS S3、Google GCS 等)中。根据用户的环境设置,Cloud Storage 连接器保证向消费者(consumer)“只发送一次” 消息。
Cloud Storage 连接器支持基于 Pulsar 主题分区或者基于时间(以天或小时为单位)的 partitioner。Partitioner 将 Pulsar 主题分区拆分成为多个数据块。数据块相当于云存储中的对象,其虚拟路径使用 Pulsar 分区 ID和该数据块的起始偏移量进行编码。对 Pulsar 分区和该数据块的起始偏移量进行编码。数据块的大小取决于云存储写入的记录的数量和 schema 兼容性。如果没有在配置中指定 partitioner,则使用保留 Pulsar 分区的缺省 partitioner。
Cloud Storage 连接器支持以下功能:
Cloud Storage 使用 partitioner 导出 Pulsar 数据,这就确保 Cloud Storage 连接器能够实现“只发送一次”数据,从而满足云存储的数据一致性要求。
Cloud Storage 连接器支持将采用 Avro、JSON 或 Parquet 格式的数据写入云存储的对象中。通常情况下,只要数据结构支持 Format 接口,Cloud Storage 连接器就可以将该数据迁移到云平台。
Cloud Storage 连接器使用 Pulsar 消息的 publishTime 时间戳定义 TimeBasedPartitioner 类。支持天或小时级别的时间间隔。
Cloud Storage 连接器使用 jclouds 实现云存储。用户可以使用 jclouds 的对象存储拓展 JAR 包来支持更多服务提供商。如需自定义连接到其他服务提供商所需的密钥,可以通过服务提供商接口(Service Provider Interface,SPI)注册 org.apache.pulsar.io.jcloud.credential.JcloudsCredential。
为什么需要 Cloud Storage 连接器
Apache Pulsar 提供丰富的连接器生态系统,将 Pulsar 与其他数据系统连接起来。2018 年 8 月,Apache Pulsar 发布 Pulsar IO。Pulsar IO 允许用户利用现有的 Pulsar Functions 框架,在 Pulsar 和外部系统(例如 MySQL、Kafka)之间传输数据。但是,有些用户希望将数据从 Apache Pulsar 迁移到云存储。这些用户被迫构建定制解决方案并手动运行它们。
试用 Cloud Storage 连接器
前提条件
➡️ 创建 AWS 账户,并登录 AWS Management Console。
➡️ 创建 AWS S3 存储桶。具体操作可参考:
https://docs.aws.amazon.com/AmazonS3/latest/gsg/CreatingABucket.html
➡️ 获取 AWS S3 存储桶的密钥。具体操作可参考:
https://docs.aws.amazon.com/IAM/latest/UserGuide/getting-started_create-admin-group.html
步骤一:
安装 Cloud Storage 连接器并运行 Pulsar Broker
1. 下载 Cloud Storage 连接器的安装包
https://github.com/streamnative/pulsar-io-cloud-storage/releases
2. 将 Cloud Storage 连接器的安装包,添加到 Pulsar broker 的配置文件中。
cp pulsar-io-cloud-storage-2.5.1.nar apache-pulsar-2.6.1/connectors/pulsar-io-cloud-storage-2.5.1.nar
3. 使用 Pulsar broker 的配置文件,启动 Pulsar broker。
cd apache-pulsar-2.6.1
bin/pulsar standalone
步骤 2:
配置并启动 Cloud Storage 连接器
1. 创建 cloud-storage-sink-config.yaml 文件,并在文件中定义 Cloud Storage 连接器的配置,如下所示。
tenant: "public"
namespace: "default"
name: "cloud-storage-sink"
inputs:
- "user-avro-topic"
archive: "connectors/pulsar-io-cloud-storage-2.5.1.nar"
parallelism: 1
configs:
provider: "aws-s3",
accessKeyId: "accessKeyId"
secretAccessKey: "secretAccessKey"
role: ""
roleSessionName: ""
bucket: "s3-sink-test"
region: ""
endpoint: "us-standard"
formatType: "parquet"
partitionerType: "time"
timePartitionPattern: "yyyy-MM-dd"
timePartitionDuration: "1d"
batchSize: 10
batchTimeMs: 1000
2. 使用 cloud-storage-sink-config.yaml 文件,在本地启动 Cloud Storage 连接器。
$PULSAR_HOME/bin/pulsar-admin sink localrun --sink-config-file cloud-storage-sink-config.yaml
步骤3:
发送 Pulsar 消息
try (
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<TestRecord> producer = pulsarClient.newProducer(Schema.AVRO(TestRecord.class))
.topic("public/default/test-parquet-avro")
.create();
) {
List<TestRecord> testRecords = Arrays.asList(
new TestRecord("key1", 1, null),
new TestRecord("key2", 1, new TestRecord.TestSubRecord("aaa"))
);
for (TestRecord record : testRecords) {
producer.send(record);
}
}
步骤 4:
验证 Pulsar 数据完整性
结 语
我们希望这篇文章能够引起您对 Cloud Storage 连接器的兴趣。Cloud Storage 连接器是一个开源项目,采用 Apache License V2。
点击「阅读原文」,下载 Cloud Storage 连接器最新发布版本,开始使用 Cloud Storage 吧!
👍 推荐阅读